{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "# Batch Ingestion\n",
    "**This notebook aggregates raw features into new derived features that is used for Fraud Detection model training/inference.**\n",
    "\n",
    "---\n",
    "\n",
    "## Contents\n",
    "\n",
    "1. [Background](#Background)\n",
    "1. [Setup](#Setup)\n",
    "1. [Create PySpark Processing Script](#Create-PySpark-Processing-Script)\n",
    "1. [Run SageMaker Processing Job](#Run-SageMaker-Processing-Job)\n",
    "1. [Explore Aggregated Features](#Explore-Aggregated-Features)\n",
    "1. [Validate Feature Group for Records](#Validate-Feature-Group-for-Records)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "### Background\n",
    "\n",
    "- This notebook takes raw credit card transactions data (csv) generated by \n",
    "[notebook 0](./0_prepare_transactions_dataset.ipynb) and aggregates the raw features to create new features (ratios) via <b>SageMaker Processing</b> PySpark Job. These aggregated features alongside the raw original features will be leveraged in the training phase of a Credit Card Fraud Detection model in the next step (see notebook [notebook 3](./3_train_and_deploy_model.ipynb)).\n",
    "\n",
    "- As part of the Spark job, we also select the latest weekly aggregated features - `num_trans_last_1w` and `avg_amt_last_1w` grouped by `cc_num` (credit card number) and populate these features into the <b>SageMaker Online Feature Store</b> as a feature group. This feature group (`cc-agg-batch-fg`) was created in notebook [notebook 1](./1_setup.ipynb).\n",
    "\n",
    "- [Amazon SageMaker Processing](https://aws.amazon.com/about-aws/whats-new/2020/09/amazon-sagemaker-processing-now-supports-built-in-spark-containers-for-big-data-processing/) lets customers run analytics jobs for data engineering and model evaluation on Amazon SageMaker easily and at scale. It provides a fully managed Spark environment for data processing or feature engineering workloads.\n",
    "\n",
    "<img src=\"./images/batch_ingestion.png\" />"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "### Setup"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "#### Imports "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "from sagemaker.spark.processing import PySparkProcessor\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "import sagemaker\n",
    "import logging\n",
    "import random\n",
    "import boto3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "print(f'Using SageMaker version: {sagemaker.__version__}')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "#### Setup Logger"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "logger = logging.getLogger('sagemaker')\n",
    "logger.setLevel(logging.INFO)\n",
    "logger.addHandler(logging.StreamHandler())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "logger.info('[Batch Aggregation using SageMaker PySpark Processing Job]')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "#### Essentials"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "sagemaker_role = sagemaker.get_execution_role()\n",
    "BUCKET = sagemaker.Session().default_bucket()\n",
    "INPUT_KEY_PREFIX = 'raw'\n",
    "OUTPUT_KEY_PREFIX = 'aggregated'\n",
    "LOCAL_DIR = './data'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "### Create PySpark Script\n",
    "This PySpark script does the following:\n",
    "\n",
    "1. Aggregates raw features to derive new features (ratios).\n",
    "2. Saves the aggregated features alongside the original raw features into a CSV file and writes it to S3 - will be used in the next step for model training.\n",
    "3. Groups the aggregated features by credit card number and picks selected aggregated features to write to SageMaker Feature Store (Online). <br>\n",
    "<b>Note: </b> The feature group was created in the previous notebook (`1_setup.ipynb`)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "%%writefile batch_aggregation.py\n",
    "from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType\n",
    "from pyspark.sql.functions import desc, dense_rank\n",
    "from pyspark.sql import SparkSession, DataFrame\n",
    "from  argparse import Namespace, ArgumentParser\n",
    "from pyspark.sql.window import Window\n",
    "import argparse\n",
    "import logging\n",
    "import boto3\n",
    "import time\n",
    "import sys\n",
    "import os\n",
    "\n",
    "\n",
    "TOTAL_UNIQUE_USERS = 10000\n",
    "FEATURE_GROUP = 'cc-agg-batch-fg'\n",
    "\n",
    "logger = logging.getLogger('sagemaker')\n",
    "logger.setLevel(logging.INFO)\n",
    "logger.addHandler(logging.StreamHandler())\n",
    "\n",
    "\n",
    "feature_store_client = boto3.client(service_name='sagemaker-featurestore-runtime')\n",
    "\n",
    "\n",
    "def parse_args() -> Namespace:\n",
    "    parser = ArgumentParser(description='Spark Job Input and Output Args')\n",
    "    parser.add_argument('--s3_input_bucket', type=str, help='S3 Input Bucket')\n",
    "    parser.add_argument('--s3_input_key_prefix', type=str, help='S3 Input Key Prefix')\n",
    "    parser.add_argument('--s3_output_bucket', type=str, help='S3 Output Bucket')\n",
    "    parser.add_argument('--s3_output_key_prefix', type=str, help='S3 Output Key Prefix')\n",
    "    args = parser.parse_args()\n",
    "    return args\n",
    "    \n",
    "\n",
    "def define_schema() -> StructType:\n",
    "    schema = StructType([StructField('tid', StringType(), True),\n",
    "                         StructField('datetime', TimestampType(), True),\n",
    "                         StructField('cc_num', LongType(), True),\n",
    "                         StructField('amount', DoubleType(), True),\n",
    "                         StructField('fraud_label', StringType(), True)])\n",
    "    return schema\n",
    "\n",
    "\n",
    "def aggregate_features(args: Namespace, schema: StructType, spark: SparkSession) -> DataFrame:\n",
    "    logger.info('[Read Raw Transactions Data as Spark DataFrame]')\n",
    "    transactions_df = spark.read.csv(f's3a://{os.path.join(args.s3_input_bucket, args.s3_input_key_prefix)}', \\\n",
    "                                     header=False, \\\n",
    "                                     schema=schema)\n",
    "    logger.info('[Aggregate Transactions to Derive New Features using Spark SQL]')\n",
    "    query = \"\"\"\n",
    "    SELECT *, \\\n",
    "           avg_amt_last_10m/avg_amt_last_1w AS amt_ratio1, \\\n",
    "           amount/avg_amt_last_1w AS amt_ratio2, \\\n",
    "           num_trans_last_10m/num_trans_last_1w AS count_ratio \\\n",
    "    FROM \\\n",
    "        ( \\\n",
    "        SELECT *, \\\n",
    "               COUNT(*) OVER w1 as num_trans_last_10m, \\\n",
    "               AVG(amount) OVER w1 as avg_amt_last_10m, \\\n",
    "               COUNT(*) OVER w2 as num_trans_last_1w, \\\n",
    "               AVG(amount) OVER w2 as avg_amt_last_1w \\\n",
    "        FROM transactions_df \\\n",
    "        WINDOW \\\n",
    "               w1 AS (PARTITION BY cc_num order by cast(datetime AS timestamp) RANGE INTERVAL 10 MINUTE PRECEDING), \\\n",
    "               w2 AS (PARTITION BY cc_num order by cast(datetime AS timestamp) RANGE INTERVAL 1 WEEK PRECEDING) \\\n",
    "        ) \n",
    "    \"\"\"\n",
    "    transactions_df.registerTempTable('transactions_df')\n",
    "    aggregated_features = spark.sql(query)\n",
    "    return aggregated_features\n",
    "\n",
    "\n",
    "def write_to_s3(args: Namespace, aggregated_features: DataFrame) -> None:\n",
    "    logger.info('[Write Aggregated Features to S3]')\n",
    "    aggregated_features.coalesce(1) \\\n",
    "                       .write.format('com.databricks.spark.csv') \\\n",
    "                       .option('header', True) \\\n",
    "                       .mode('overwrite') \\\n",
    "                       .option('sep', ',') \\\n",
    "                       .save('s3a://' + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix))\n",
    "    \n",
    "def group_by_card_number(aggregated_features: DataFrame) -> DataFrame: \n",
    "    logger.info('[Group Aggregated Features by Card Number]')\n",
    "    window = Window.partitionBy('cc_num').orderBy(desc('datetime'))\n",
    "    sorted_df = aggregated_features.withColumn('rank', dense_rank().over(window))\n",
    "    grouped_df = sorted_df.filter(sorted_df.rank == 1).drop(sorted_df.rank)\n",
    "    sliced_df = grouped_df.select('cc_num', 'num_trans_last_1w', 'avg_amt_last_1w')\n",
    "    return sliced_df\n",
    "\n",
    "\n",
    "def transform_row(sliced_df: DataFrame) -> list:\n",
    "    logger.info('[Transform Spark DataFrame Row to SageMaker Feature Store Record]')\n",
    "    records = []\n",
    "    for row in sliced_df.rdd.collect():\n",
    "        record = []\n",
    "        cc_num, num_trans_last_1w, avg_amt_last_1w = row\n",
    "        if cc_num:\n",
    "            record.append({'ValueAsString': str(cc_num), 'FeatureName': 'cc_num'})\n",
    "            record.append({'ValueAsString': str(num_trans_last_1w), 'FeatureName': 'num_trans_last_1w'})\n",
    "            record.append({'ValueAsString': str(round(avg_amt_last_1w, 2)), 'FeatureName': 'avg_amt_last_1w'})\n",
    "            records.append(record)\n",
    "    return records\n",
    "\n",
    "\n",
    "def write_to_feature_store(records: list) -> None:\n",
    "    logger.info('[Write Grouped Features to SageMaker Online Feature Store]')\n",
    "    success, fail = 0, 0\n",
    "    for record in records:\n",
    "        event_time_feature = {\n",
    "                'FeatureName': 'trans_time',\n",
    "                'ValueAsString': str(int(round(time.time())))\n",
    "            }\n",
    "        record.append(event_time_feature)\n",
    "        response = feature_store_client.put_record(FeatureGroupName=FEATURE_GROUP, Record=record)\n",
    "        if response['ResponseMetadata']['HTTPStatusCode'] == 200:\n",
    "            success += 1\n",
    "        else:\n",
    "            fail += 1\n",
    "    logger.info('Success = {}'.format(success))\n",
    "    logger.info('Fail = {}'.format(fail))\n",
    "    assert success == TOTAL_UNIQUE_USERS\n",
    "    assert fail == 0\n",
    "\n",
    "\n",
    "def run_spark_job():\n",
    "    spark = SparkSession.builder.appName('PySparkJob').getOrCreate()\n",
    "    args = parse_args()\n",
    "    schema = define_schema()\n",
    "    aggregated_features = aggregate_features(args, schema, spark)\n",
    "    write_to_s3(args, aggregated_features)\n",
    "    sliced_df = group_by_card_number(aggregated_features)\n",
    "    records = transform_row(sliced_df)\n",
    "    write_to_feature_store(records)\n",
    "    \n",
    "    \n",
    "if __name__ == '__main__':\n",
    "    run_spark_job()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "### Run SageMaker Processing Job"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "spark_processor = PySparkProcessor(base_job_name='sagemaker-processing', \n",
    "                                   framework_version='2.4', # spark version\n",
    "                                   role=sagemaker_role, \n",
    "                                   instance_count=1, \n",
    "                                   instance_type='ml.r5.4xlarge', \n",
    "                                   env={'AWS_DEFAULT_REGION': boto3.Session().region_name},\n",
    "                                   max_runtime_in_seconds=1200)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "spark_processor.run(submit_app='batch_aggregation.py', \n",
    "                    arguments=['--s3_input_bucket', BUCKET, \n",
    "                               '--s3_input_key_prefix', INPUT_KEY_PREFIX, \n",
    "                               '--s3_output_bucket', BUCKET, \n",
    "                               '--s3_output_key_prefix', OUTPUT_KEY_PREFIX],\n",
    "                    spark_event_logs_s3_uri='s3://{}/logs'.format(BUCKET),\n",
    "                    logs=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "### Explore Aggregated Features \n",
    "<p> The SageMaker Processing Job above creates the aggregated features alongside the raw features and writes it to S3. \n",
    "Let us verify this output using the code below and prep it to be used in the next step for model training.</p>\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "Copy results csv from S3 to local directory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "!rm {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part*.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "!aws s3 cp s3://{BUCKET}/{OUTPUT_KEY_PREFIX}/ {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/ --recursive --exclude '_SUCCESS'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "!mv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part*.csv {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "agg_features = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv')\n",
    "agg_features.dropna(inplace=True)\n",
    "agg_features['cc_num'] = agg_features['cc_num'].astype(np.int64)\n",
    "agg_features['fraud_label'] = agg_features['fraud_label'].astype(np.int64)\n",
    "agg_features.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "agg_features.to_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/processing_output.csv', index=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "Remove the intermediate `part.csv` file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "!rm {LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/part.csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "### Validate Feature Group for Records\n",
    "Let's randomly pick N credit card numbers from the `processing_output.csv` and verify if records exist in the feature group `cc-agg-batch-fg` for these card numbers."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "N = 3 # number of random records to validate\n",
    "FEATURE_GROUP = 'cc-agg-batch-fg'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "processing_out_df = pd.read_csv(f'{LOCAL_DIR}/{OUTPUT_KEY_PREFIX}/processing_output.csv')\n",
    "processing_out_df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "cc_nums = random.sample(processing_out_df['cc_num'].tolist(), N)\n",
    "cc_nums"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "source": [
    "Using SageMaker Feature Store run-time client, we can verify if records exist in the feature group for the picked `cc_nums` "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "feature_store_client = boto3.Session().client(service_name='sagemaker-featurestore-runtime')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": [
    "success, fail = 0, 0\n",
    "for cc_num in cc_nums:\n",
    "    response = feature_store_client.get_record(FeatureGroupName=FEATURE_GROUP, \n",
    "                                               RecordIdentifierValueAsString=str(cc_num))\n",
    "    if response['ResponseMetadata']['HTTPStatusCode'] == 200 and 'Record' in response.keys():\n",
    "        success += 1\n",
    "        print(response['Record'])\n",
    "    else:\n",
    "        print(response)\n",
    "        fail += 1\n",
    "assert success == N"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "button": false,
    "new_sheet": false,
    "run_control": {
     "read_only": false
    }
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
